Skip to content

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625

Draft
onurctirtir wants to merge 21 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli
Draft

Add citus.allow_unsafe_insert_select_pushdown for shard-local batched INSERT ... SELECT#8625
onurctirtir wants to merge 21 commits into
citusdata:mainfrom
onurctirtir:onurctirtir-automatic-broccoli

Conversation

@onurctirtir

@onurctirtir onurctirtir commented Jun 29, 2026

Copy link
Copy Markdown
Member

DESCRIPTION: Adds citus.allow_unsafe_insert_select_pushdown to allow maybe unsafe shard-local batched INSERT .. SELECT ..

Some workloads want to call an expensive, batch-oriented UDF from a colocated
INSERT … SELECT — e.g. process_array(text[]), which returns something
per element, in order. The natural way to use it is to batch rows with array_agg
(+ a window function to form batches), call the UDF once per batch, then unnest
the result back to one row per input and insert it.

On a distributed table this SELECT looks like it "requires a merge" (e.g., GROUP
BY / window on a non-distribution column), so Citus today falls back to
pull-to-coordinator: it pulls every shard's rows to the coordinator, batches
there, and calls the UDF there. That defeats the purpose — the batching and the
expensive call should run on the shards.

What this adds

With citus.allow_unsafe_insert_select_pushdown (default off), for a
colocated INSERT … SELECT, it relaxes the merge-step checks that would
otherwise cause pull-to-coordinator, so the whole SELECT — grouping, window,
and the batch UDF — is pushed down and runs shard-local, one task per shard.

Concretely, when the GUC is on:

  • GROUP BY / aggregates / HAVING / window / DISTINCT on non-distribution columns
    no longer force a merge step — for the top-level SELECT and the subqueries.
  • The INSERT's distribution-column target entry no longer has to match a SELECT
    partition column as a plain Var. Instead it may be a provably shard-local
    batch pass-through of the distribution column
    , i.e.
    unnest(array_agg(dist_key)) (InsertPartitionColumnMatchesSelect is skipped
    only when this exact pattern is detected). Because those values are read
    straight from this shard's rows and — since source and target are colocated —
    hash right back into this shard's range, routing stays correct. The NOT NULL
    filter Citus normally injects on that column is skipped too, since there is no
    plain Var to attach it to.

However, the following are still enforced:

  • The distribution value must be the unnest(array_agg(dist_key)) pass-through
    (or a plain Var).
    Any other derived distribution value — e.g.
    dist_key + 1, length(text_col), or unnest(f(array_agg(dist_key))) — is
    still rejected even with the GUC on, because it could produce values that hash
    to a different shard and silently misroute rows.
  • Colocation of the INSERT target and all SELECT source relations is fully
    enforced. This is what keeps every batch shard-local, and is the reason the
    relaxation is sound rather than arbitrary.
  • Volatile functions are still rejected from pushdown (the batch UDF is
    expected to be immutable, so this ban is not relaxed).
  • LIMIT/OFFSET is still rejected.

Within those guardrails the user still takes responsibility for making each batch
order-preserving (e.g. array_agg(... ORDER BY ...)) so generated rows line up.

EXPLAIN — before / after

The batch shape: bucket rows into fixed-size batches with row_number()/batch_size,
array_agg each batch (id and text in the same order), call the batch UDF once
per batch, then unnest back to one row per id.

Default (off) — batching happens after pulling every shard's rows to the
coordinator, so the window scan runs a distributed subplan and the grouping +
UDF run in a single coordinator task over the intermediate results:

EXPLAIN (COSTS OFF) INSERT INTO res(text_id, val)
SELECT id, v FROM (
  SELECT
    unnest(array_agg(text_id  ORDER BY text_id)) id,
    unnest(process_array(array_agg(text_col ORDER BY text_id))) v
  FROM (
    SELECT text_id, text_col, (row_number() OVER () - 1) / 100 AS batch FROM dist
  ) q
  GROUP BY batch
) s;

 Custom Scan (Citus INSERT ... SELECT)
   INSERT/SELECT method: pull to coordinator
   ->  Custom Scan (Citus Adaptive)
         ->  Distributed Subplan XXX_1
               ->  WindowAgg
                     ->  Custom Scan (Citus Adaptive)
                           Task Count: 4
                           Tasks Shown: One of 4
                           ->  Task
                                 ->  Seq Scan on dist_14000000 dist
         Task Count: 1
         ->  Task
               ->  Subquery Scan on s
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: intermediate_result.batch
                                 ->  Sort
                                       Sort Key: intermediate_result.batch, intermediate_result.text_id
                                       ->  Function Scan on read_intermediate_result intermediate_result

With citus.allow_unsafe_insert_select_pushdown = on — the window, grouping, the
batch UDF, and the INSERT all run shard-local, one task per shard, with no
distributed subplan / intermediate results:

 Custom Scan (Citus Adaptive)
   Task Count: 4
   Tasks Shown: One of 4
   ->  Task
         ->  Insert on res_14000004 citus_table_alias
               ->  Subquery Scan on s
                     ->  ProjectSet
                           ->  GroupAggregate
                                 Group Key: q.batch
                                 ->  Sort
                                       Sort Key: q.batch, q.text_id
                                       ->  Subquery Scan on q
                                             ->  WindowAgg
                                                   ->  Seq Scan on dist_14000000 dist

Onur Tirtir and others added 9 commits June 29, 2026 11:49
…tching pushdown

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@codecov

codecov Bot commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 89.54248% with 16 lines in your changes missing coverage. Please review.
✅ Project coverage is 88.72%. Comparing base (efa65fc) to head (8ede5f5).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8625      +/-   ##
==========================================
- Coverage   88.73%   88.72%   -0.02%     
==========================================
  Files         288      288              
  Lines       64384    64442      +58     
  Branches     8108     8128      +20     
==========================================
+ Hits        57133    57175      +42     
- Misses       4909     4916       +7     
- Partials     2342     2351       +9     
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Onur Tirtir and others added 2 commits June 30, 2026 10:59
Fixes check-style ci/check_gucs_are_alphabetically_sorted.sh.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Wrap the two EXPLAIN statements in public.explain_filter(..., true) so the
PG18-only "Window:" line is stripped and the plan footer row counts match
across supported Postgres versions. Regenerate the expected output, which
also re-syncs blank lines that had drifted from the .sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir onurctirtir force-pushed the onurctirtir-automatic-broccoli branch from 7e67c16 to f6f01e7 Compare June 30, 2026 08:25
onurctirtir and others added 5 commits June 30, 2026 08:57
Validate the shard-local INSERT..SELECT batching pushdown plan directly on
PG18 (no public.explain_filter), so the PG18-only WindowAgg "Window:" line
is exercised in pg18.sql.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Cover each relaxed branch (volatile fn, derived insert dist col, GROUP BY
on non-dist col, aggregates and HAVING without GROUP BY, window not on
dist col, DISTINCT without dist col), a few combinations, and the same
constructs nested in subqueries. Add negative tests showing the GUC does
not apply to reference-table targets and never relaxes LIMIT/OFFSET.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The unsafe insert-select pushdown GUC no longer relaxes the volatile
function ban: a batched-embeddings style UDF is immutable, so the GUC only
needs to relax grouping / window / aggregate / DISTINCT on non-distribution
columns and partition-column matching. Volatile functions stay blocked from
shard pushdown and fall back to the coordinator plan as before.

- insert_select_planner.c: always defer on volatile functions
- shared_library_init.c: drop "volatile functions" from the GUC description
- tests: make batch_transform IMMUTABLE PARALLEL SAFE, replace the volatile
  EXPLAIN test with the batched-embeddings benchmark query shape
  (row_number()/batch_size bucketing + array_agg + batch UDF + unnest zip-back)
  as the SELECT of an INSERT .. SELECT, with an id<->val correctness check,
  and a negative test showing a volatile function is still not pushed down
  even with the GUC enabled.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
PG18 emits 'Insert on res_... citus_table_alias' for the shard-local INSERT..SELECT pushdown plan; the committed pg18.out was missing the alias, causing the pg18 test (and all Test flakyness shards) to fail on CI. Match CI output.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


⚠️ Blocking correctness issue: pushed-down batch pass-through can silently insert NULL distribution keys

The PR treats unnest(array_agg(dist_key)) as a safe shard-local pass-through and then skips AddPartitionKeyNotNullFilterToSelect() for that shape. But PostgreSQL ProjectSet pads shorter set-returning targetlist expressions with NULL when sibling SRFs emit more rows. That means the target distribution column can become NULL even though direct Citus inserts reject NULL partition keys.

Minimal repro from an isolated ~/citus-iso/pr8625-review cluster:

SET citus.allow_unsafe_insert_select_pushdown TO on;
TRUNCATE res;

EXPLAIN (COSTS OFF)
INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k)),
       unnest(array_append(array_agg(v ORDER BY k), 'extra-value'))
FROM dist_a
GROUP BY grp;

INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k)),
       unnest(array_append(array_agg(v ORDER BY k), 'extra-value'))
FROM dist_a
GROUP BY grp;

SELECT count(*) AS total,
       count(*) FILTER (WHERE k IS NULL) AS null_keys
FROM res;

Observed:

Custom Scan (Citus Adaptive)
  Task Count: 4
  -> Insert on res_102016 citus_table_alias
     -> ProjectSet
        -> GroupAggregate

INSERT 0 106
 total | null_keys
-------+-----------
   106 |        26

Direct coordinator routing still rejects the same invalid key shape:

INSERT INTO res(k, val) VALUES (NULL, 'direct-null-still-rejected');
-- ERROR: cannot perform an INSERT with NULL in the partition column

A second variant also reproduces it:

INSERT INTO res(k, val)
SELECT unnest(array_agg(k ORDER BY k) FILTER (WHERE k % 2 = 0)),
       unnest(array_agg(v ORDER BY k))
FROM dist_a
GROUP BY grp;
-- INSERT 0 80; 40 NULL distribution keys written

Why this matters: the safety argument says the distribution value is read straight from the shard's source rows and hashes back to the same shard. That is false for padded NULL rows produced by ProjectSet. It also bypasses Citus' normal "no NULL partition column" insert invariant unless the user happened to declare the distribution column NOT NULL, in which case the worker shard constraint errors after pushdown.

Relevant code paths:

  • multi_logical_optimizer.c: IsBatchUnnestArrayAggPartitionColumn() documents the expression as “provably shard-local” and explicitly allows ORDER BY / DISTINCT / FILTER modifiers.
  • multi_router_planner.c: AddPartitionKeyNotNullFilterToSelect() skips the partition-key IS NOT NULL filter when the batch pass-through pattern is found.

Recommendation: don't merge until this invariant is addressed. At minimum, add regression tests for value-side extra rows and distribution-side FILTER/shorter SRF. Design-wise, the implementation needs a way to preserve the “no NULL partition key” invariant for batch pass-through; simply trusting unnest(array_agg(dist_key)) is not enough because sibling SRFs can synthesize NULLs at the output row level.

@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


avoid using the global GUC inside AddPartitionKeyNotNullFilterToSelect().

Right now the helper decides to skip the partition-key IS NOT NULL filter by checking AllowUnsafeInsertSelectPushdown and scanning for any unnest(array_agg(dist_key))-like target entry. That makes a generic router/deparse helper depend on global session state rather than on the exact insert-select validation result.

I think it would be safer to pass an explicit “the SELECT target feeding the INSERT distribution column was validated as batch pass-through” signal/expression from the insert-select planning path. That would keep this exception local to the already-validated query shape and avoid future accidental skips if the helper is reached from another path while the GUC is on.

@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


deduplicate the INSERT distribution-target lookup.

InsertPartitionColumnMatchesSelect() and InsertPartitionColumnIsBatchPassThrough() both walk the INSERT target list to find the SELECT target entry that feeds the target table's distribution column. Since the new exception depends on matching exactly the same target position as the existing plain-Var validation, it would be less brittle to extract a shared helper that returns that SELECT TargetEntry * (or an error/null result).

That would reduce drift risk if target-list handling changes later, and it would make it clearer that both validation paths are checking the same INSERT distribution-column mapping.

@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


consider moving/renaming IsBatchUnnestArrayAggPartitionColumn() to make the scope narrower.

The current name and location in multi_logical_optimizer.c make it sound like a broadly reusable partition-column predicate. In practice, it is a very specific exception for colocated INSERT ... SELECT batching: “is this target expression acceptable as the INSERT distribution-column pass-through under the unsafe GUC?”

A more localized name/location, or a name that mentions insert-select batch pass-through explicitly, would make accidental reuse less likely.

@onurctirtir

onurctirtir commented Jul 1, 2026

Copy link
Copy Markdown
Member Author

Copilot Suggestion


tighten the safety wording in comments/docs after the invariant is fixed.

Several comments describe unnest(array_agg(dist_key)) as “provably shard-local” because values come from the current shard and hash back to the same shard. That is the intended argument, but it is only true if the output row's distribution value is guaranteed to be one of those original values and not a targetlist/SRF artifact such as a padded NULL.

Even with this being an explicitly unsafe/user-opt-in GUC, I think the comments should state the precise invariant being preserved rather than the broader “provably shard-local” wording.

Onur Tirtir and others added 5 commits July 2, 2026 11:25
Extract the shared INSERT target-list walk that maps the target table's
distribution column to the SELECT (subquery) target entry into a single
helper, SelectTargetEntryForInsertPartitionColumn(). Both
InsertPartitionColumnMatchesSelect() and
InsertPartitionColumnIsBatchPassThrough() previously duplicated this
lookup; they now call the helper.

The helper optionally reports the matching INSERT target entry so
InsertPartitionColumnMatchesSelect() can still inspect casting on the
distribution column. Pure refactor; no behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
AddPartitionKeyNotNullFilterToSelect() previously consulted the global
GUC AllowUnsafeInsertSelectPushdown (and re-walked the target list) to
decide whether to skip the NOT NULL filter for a batch pass-through
distribution column. That coupled a low-level deparse/router helper to a
planner-level GUC and re-derived a fact the caller already knows.

Instead, take a bool distributionColumnIsBatchPassThrough parameter and
skip the filter when it is set and there is no plain-Var partition
column. Both callers (RouterModifyTaskForShardInterval and
deparse_shard_query) compute the flag via the now-exported
InsertPartitionColumnIsBatchPassThrough(), which inspects the query
shape directly. A batch-shape query with the GUC off is rejected during
planning and never reaches these call sites, so the query-shape
predicate is the correct signal here.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
IsBatchUnnestArrayAggPartitionColumn() lived in multi_logical_optimizer.c
and was exported, but after the previous commit its only caller is
InsertPartitionColumnIsBatchPassThrough() in insert_select_planner.c.

Move it next to its caller as a file-local static and rename it to
IsInsertSelectBatchPassThroughDistributionColumn(), which better
describes what it checks and where it is used. Drop the header export and
add utils/fmgroids.h to insert_select_planner.c for the array_agg/unnest
function OIDs. No behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Several comments described unnest(array_agg(dist_key)) as "provably
shard-local". That overstates what the shape check proves: it only
validates the distribution-column expression, and its safety argument
holds only if the row reaching the INSERT actually carries one of the
original source-row distribution values rather than a targetlist/SRF
artifact (e.g. a NULL-padded row from a set-returning function).

Reword the acceptance-site comment, the moved predicate's doc, the
InsertPartitionColumnIsBatchPassThrough doc, and the router NOT NULL
filter comment to state the precise invariant and explicitly call out
the unguarded SRF NULL-padding gap as the reason the GUC is unsafe.
Comments only; no behavior change.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SelectTargetEntryForInsertPartitionColumn() (added when de-duplicating the
INSERT distribution-column lookup) unconditionally dereferences the target
table's distribution column via PartitionColumn(insertRelationId, 1). For
single-shard (null distribution key) target tables that helper returns NULL,
so the subsequent insertPartitionColumn->varattno access segfaulted the
backend.

This path is reached from RouterModifyTaskForShardInterval when a shard-key
source is inserted into a single-shard target (e.g.
INSERT INTO nullkey_c1_t1 SELECT * FROM range_table), because
InsertPartitionColumnIsBatchPassThrough() is now computed unconditionally at
the call site to feed AddPartitionKeyNotNullFilterToSelect(). The earlier
InsertPartitionColumnMatchesSelect() caller was guarded by
HasDistributionKey(targetRelationId), which is why the crash only surfaced on
the batch pass-through path.

Return NULL early when the target has no distribution column. This restores
the pre-refactor behavior: the batch pass-through check reports false and the
plain-Var NOT NULL filter is still added as before.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant